#include "config.h"

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"
#include "volume_ctl_internal.h"


/**
 * @brief 初始化rollback过程所需信息，避免在chunk级进行处理
 *
 * @note 需要保证在整个rollback过程中，所有变量不发生改变。如果发生改变，如何reload？
 *
 * @param volume_proto
 * @return
 */
int rollback_context_init(volume_proto_t *volume_proto) {
        int ret;
        table1_t *table1 = &volume_proto->table1;
        fileinfo_t *fileinfo = &table1->fileinfo;

        DINFO("enter\n");

        rollback_ctx_t *ctx = &volume_proto->rollback_ctx;

        YASSERT(ctx->inited == 0);

        INIT_LIST_HEAD(&ctx->root_snap_list);
        INIT_LIST_HEAD(&ctx->auto_snap_list);
        INIT_LIST_HEAD(&ctx->downstream_snap_list);
        INIT_LIST_HEAD(&ctx->cross_snap_list);

        ctx->auto_snap_chkinfo = (void *)ctx->__auto_snap_chkinfo;
        ctx->from_snap_chkinfo = (void *)ctx->__from_snap_chkinfo;

        // auto snap
        ctx->auto_snap = fileinfo->snap_version;
        ctx->from_snap = fileinfo->snap_from;

        ret = table1->snapshot_getbyversion(table1, ctx->auto_snap, ctx->auto_snap_chkinfo, NULL);
        if (ret == 0) {
                ctx->has_auto_snap = 1;
        } else {
                ctx->has_auto_snap = 0;
        }

        ret = table1->snapshot_getbyversion(table1, ctx->from_snap, ctx->from_snap_chkinfo, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // root snap list
        ret = table1->table_proto->snap->list_parent(table1->table_proto,
                                                     fileinfo->snap_from,
                                                     &ctx->root_snap_list);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // auto snap list
        ret = volume_proto_snapshot_buildlist(volume_proto,
                                              fileinfo->snap_from,
                                              fileinfo->snap_rollback,
                                              1,
                                              &ctx->auto_snap_list);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // downstream snap list
        ret = table1->table_proto->snap->list_descendant(table1->table_proto,
                                                         fileinfo->snap_rollback,
                                                         &ctx->downstream_snap_list);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // cross snap list
        ret = volume_proto_snapshot_buildlist(volume_proto,
                                              fileinfo->snap_from,
                                              fileinfo->snap_rollback,
                                              0,
                                              &ctx->cross_snap_list);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        struct list_head *pos, *n;
        struct list_head *pos2, *n2;
        snap_t *snap1, *snap2;
        int found;
        list_for_each_safe(pos, n, &ctx->cross_snap_list) {
                snap1 = list_entry(pos, snap_t, hook);
                found = 0;
                list_for_each_safe(pos2, n2, &ctx->downstream_snap_list) {
                        snap2 = list_entry(pos2, snap_t, hook);
                        if (snap1->snap_version == snap2->snap_version) {
                                found = 1;
                                break;
                        }
                }

                if (!found) {
                        list_del_init(pos);
                        list_add_tail(pos, &ctx->downstream_snap_list);
                }
        }

        ctx->inited = 1;

        return 0;
err_ret:
        return ret;
}

int rollback_context_destroy(volume_proto_t *volume_proto) {
        rollback_ctx_t *ctx = &volume_proto->rollback_ctx;

        if (ctx->inited) {
                __snapshot_list_free(&ctx->root_snap_list);
                __snapshot_list_free(&ctx->auto_snap_list);
                __snapshot_list_free(&ctx->downstream_snap_list);
                __snapshot_list_free(&ctx->cross_snap_list);
        }

        memset(ctx, 0x0, sizeof(*ctx));
        return 0;
}

int rollback_context_reload(volume_proto_t *volume_proto) {
        rollback_context_destroy(volume_proto);
        rollback_context_init(volume_proto);
        return 0;
}

int rollback_context_dump(volume_proto_t *volume_proto) {
        table1_t *table1 = &volume_proto->table1;
        fileinfo_t *fileinfo = &table1->fileinfo;
        rollback_ctx_t *ctx = &volume_proto->rollback_ctx;
        struct list_head *pos, *n;
        snap_t *snap;

        if (ctx->inited) {
                DINFO("chkid %s chknum %u snap_from %ju snap_version %ju snap_rollback %ju has_auto_snap %d\n",
                      id2str(&table1->chkid),
                      ctx->chknum,
                      fileinfo->snap_from,
                      fileinfo->snap_version,
                      fileinfo->snap_rollback,
                      ctx->has_auto_snap);

                list_for_each_safe(pos, n, &ctx->root_snap_list) {
                        snap = list_entry(pos, snap_t, hook);
                        DINFO("root key %s snap_from %ju snap_version %ju\n", snap->key,
                              snap->snap_from,
                              snap->snap_version);
                }

                list_for_each_safe(pos, n, &ctx->auto_snap_list) {
                        snap = list_entry(pos, snap_t, hook);
                        DINFO("auto key %s snap_from %ju snap_version %ju\n", snap->key,
                              snap->snap_from,
                              snap->snap_version);
                }

                list_for_each_safe(pos, n, &ctx->downstream_snap_list) {
                        snap = list_entry(pos, snap_t, hook);
                        DINFO("downstream key %s snap_from %ju snap_version %ju\n", snap->key,
                              snap->snap_from,
                              snap->snap_version);
                }

                list_for_each_safe(pos, n, &ctx->cross_snap_list) {
                        snap = list_entry(pos, snap_t, hook);
                        DINFO("cross key %s snap_from %ju snap_version %ju\n", snap->key,
                              snap->snap_from,
                              snap->snap_version);
                }

                DINFO("stat chknum %ju allocate %ju update %ju auto w %ju vol r %ju w %ju snap r %ju e %ju\n",
                      ctx->stat.chknum,
                      ctx->stat.allocate,
                      ctx->stat.update_meta,
                      ctx->stat.auto_write,
                      ctx->stat.vol_read,
                      ctx->stat.vol_write,
                      ctx->stat.snap_read,
                      ctx->stat.snap_exist);
        }

        return 0;
}

STATIC int __volume_ctl_snapshot_rollback(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        va_end(ap);

        int ret, retry = 0;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        char pool[MAX_NAME_LEN];
        volume_format_t format;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;

        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        strcpy(pool, volume_proto->table1.pool);
        format = lich_volume_format(&volume_proto->table1.fileinfo);

        // - create auto snap if fileinfo.snap_from is leaf node
        // - update snap_rollback
        // - add task to MQ
        ret = volume_proto->snapshot_rollback(volume_proto, name);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        if (format == VOLUME_FORMAT_RAW) {
                // 发起rollback后台任务
        retry:
                ret = bh_task_create(pool, ROLLBACK_ROOT, volid);
                if (unlikely(ret)) {
                        if (ret == EEXIST) {
                                ret = EBUSY;
                                GOTO(err_ret, ret);
                        } else {
                                ret = _errno(ret);
                                if (ret == EAGAIN) {
                                        USLEEP_RETRY(err_ret, ret, retry, retry,
                                                     gloconf.rpc_timeout * 2, (1000 * 1000));
                                } else
                                        GOTO(err_ret, ret);
                        }

                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_rollback(const volid_t *volid, const char *name)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_rollback", __volume_ctl_snapshot_rollback,
                           volid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __rollback_bh1_chunk_setbp(snapshot_rollback_arg_t *arg)
{
        int ret, curr;
        snapshot_rollback_bp_t *snapbp = arg->snapbp;

        ret = bmap_set(snapbp->bmap, arg->chunk_idx - snapbp->bp);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if ((arg->chunk_idx - snapbp->bp) - snapbp->curr > VOLUME_CTL_BP_INTERVAL
                        && snapbp->bpupdate == FALSE) {

                snapbp->bpupdate = TRUE;
                curr = bmap_continuous(snapbp->bmap, snapbp->curr);

                if (snapbp->curr != curr) {
                        ret = volume_ctl_setbp(arg->cent, LICH_SYSTEM_ATTR_ROLLBACK_BP, snapbp->bp + curr);
                        if (unlikely(ret)) {
                                DWARN("vol "CHKID_FORMAT" setbp %d fail:%d\n",
                                                CHKID_ARG(&((volume_proto_t *)arg->cent->value)->chkid), curr, ret);
                        } else {
                                DINFO("vol "CHKID_FORMAT" setbp %d --> %d successful\n",
                                                CHKID_ARG(&((volume_proto_t *)arg->cent->value)->chkid), snapbp->curr, curr);

                                snapbp->curr = curr;
                        }
                }

                snapbp->bpupdate = FALSE;
        }

        return 0;
err_ret:
        return ret;
}

STATIC void __rollback_bh1_chunk(void *_arg)
{
        int ret, deleting;
        chkid_t chkid;
        snapshot_rollback_arg_t *arg = _arg;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_rdlock(arg->cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = arg->cent->value;
        fid2cid(&chkid, &volume_proto->chkid, arg->chunk_idx);

        deleting = volume_proto->table1.fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_lock, ret);
        }

        volume_proto->rollback_ctx.stat.chknum++;

        ret = volume_proto->snapshot_rollback_bh1(volume_proto, &chkid);
        if (unlikely(ret)) {
                if (ret != ENOENT) {
                        GOTO(err_lock, ret);
                }
        }

        __volume_ctl_unlock(arg->cent);

        ret = __rollback_bh1_chunk_setbp(arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_SNAPSHOT_ROLLBACK_MULTITASK
        (*arg->task_count)--;
        co_cond_broadcast(arg->cond, 0);
#endif
        yfree((void **)&arg);
        return;

err_lock:
        __volume_ctl_unlock(arg->cent);
err_ret:
#if ENABLE_SNAPSHOT_ROLLBACK_MULTITASK
        (*arg->task_count)--;
        co_cond_broadcast(arg->cond, ret);
        yfree((void **)&arg);

        /* we need retry again, but schedule will trapped in the dead circle, run a task create a new task */
        /*
        schedule_task_new("snapshot_rollback", __rollback_bh1_chunk, arg, -1);
        */
#else
        yfree((void **)&arg);
#endif
}

STATIC int __volume_ctl_rollback_bh1(mcache_entry_t *cent, bmap_t *bmap, uint32_t bp)
{
        int ret, chknum, i, err_count = 0;
        volume_proto_t *volume_proto;
        table1_t *table1;
        snapshot_rollback_bp_t snapbp;
        snapshot_rollback_arg_t *arg;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        table1 = &volume_proto->table1;
        chknum = size2chknum(table1->fileinfo.size, &table1->fileinfo.ec);

        __volume_ctl_unlock(cent);

        int task_count = 0;
        co_cond_t cond;
        co_cond_init(&cond);

        snapbp.bmap = bmap;
        snapbp.bp = bp;
        snapbp.bpupdate = FALSE;
        snapbp.curr = 0;

        // TODO 性能优化： 并行
        // TODO stop gracefully

        YASSERT(bp <= chknum);
        for (i = bp; i < chknum; i++) {

                while (task_count > SNAPSHOT_ROLLBACK_MULTITASK) {
                        ret = co_cond_wait2(&cond, "snapshot_rollback");
                        if (unlikely(ret)) {
                                if (ret == ECANCELED) {
                                        DWARN("i %d cancelled\n", i);
                                        goto err_canceled;
                                } else {
                                        err_count++;
                                        DWARN("i %d chknum %d ret %d err %d\n", i, chknum, ret, err_count);
                                }
                        }
                }

                ret = ymalloc((void **)&arg, sizeof(snapshot_rollback_arg_t));
                if (unlikely(ret)) {
                        GOTO(err_canceled, ret);
                }

                arg->cent = cent;
                arg->chunk_idx = i;
                arg->task_count = &task_count;
                arg->cond = &cond;
                arg->snapbp = &snapbp;

#if ENABLE_SNAPSHOT_ROLLBACK_MULTITASK
                task_count++;
                schedule_task_new("snapshot_rollback", __rollback_bh1_chunk, arg, -1);
#else
                ret = __rollback_bh1_chunk(arg);
                if (unlikely(ret)) {
                        if (ret == ENOENT || ret == ENOKEY) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }
#endif
        }

        while (task_count > 0) {
                ret = co_cond_wait2(&cond, "snapshot_rollback");
                if (unlikely(ret)) {
                        err_count++;
                        DWARN("ret %d err %d\n", ret, err_count);
                }
        }

        if (err_count) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = volume_ctl_setbp(cent, LICH_SYSTEM_ATTR_ROLLBACK_BP, chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;

err_canceled:
        while (task_count > 0) {
                co_cond_wait2(&cond, "snapshot_rollback");
        }
err_ret:
        return ret;
}

STATIC int __volume_ctl_rollback_bh2(mcache_entry_t *cent, volid_t *snapid)
{
        int ret;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_rollback_bh2(volume_proto, snapid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_rollback_bh__(mcache_entry_t *cent, buffer_t *buf)
{
        int ret;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_rollback_bh(volume_proto, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        volume_proto = cent->value;
        volume_proto->ltime = 0;
        DINFO(""CHKID_FORMAT" reset\n", CHKID_ARG(&volume_proto->chkid));

        __volume_ctl_unlock(cent);
        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_rollback_bh(mcache_entry_t *cent, fileid_t *snapid)
{
        int ret;
        buffer_t buf;

        ret = mbuffer_init(&buf, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // for xattr
        // TODO retry: ENOENT
        ret = md_chunk_table_read(snapid, snapid, &buf, LICH_CHUNK_SPLIT, 0);
        if (unlikely(ret))
                GOTO(err_free, ret);

        // update xattr and fileinfo (snap_version, snap_rollback, snap_from etc)
        ret = __volume_ctl_rollback_bh__(cent, &buf);
        if (unlikely(ret))
                GOTO(err_free, ret);

        mbuffer_free(&buf);
        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

/**
 * 分析以下场景：
 * 1, 故障点
 * 2, 重入此过程
 *
 * 如果故障点出现在更新snap_version和snap_rollback之后，删除任务之前，会反复触发此过程。
 *
 * @param volid
 * @return
 */
int volume_ctl_snapshot_rollback_bh(const volid_t *volid)
{
        int ret, need_rollback;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto = NULL;
        fileinfo_t *fileinfo;
        volid_t snapid;
        char pool[MAX_NAME_LEN];

        /* for rollback resume from break-point */
        uint32_t bp;
        bmap_t bmap;

        struct timeval t1, t2, t3, t4, t5;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        fileinfo = &volume_proto->table1.fileinfo;

        // 先copy出需要的volume_proto属性副本
        strcpy(pool, volume_proto->table1.pool);
        need_rollback = (fileinfo->snap_version != fileinfo->snap_rollback);

        rollback_context_reload(volume_proto);
        rollback_context_dump(volume_proto);

        __volume_ctl_unlock(cent);

        if (!need_rollback) {
                // 删除任务的卷，不关心返回值
                bh_task_remove(pool, ROLLBACK_ROOT, volid);

                goto out;
        }

        // get rollback break-point
        ret = volume_ctl_getbp(cent, LICH_SYSTEM_ATTR_ROLLBACK_BP, &bp);
        if (unlikely(ret))
                GOTO(err_release, ret);

        /* maybe bp == chknum, so bmap reserved 1bit*/
        ret = bmap_create(&bmap, size2chknum(fileinfo->size, &fileinfo->ec) - bp + 1);
        if (unlikely(ret))
                GOTO(err_release, ret);

        _gettimeofday(&t1, NULL);

        // save & restore chunk
        ret = __volume_ctl_rollback_bh1(cent, &bmap, bp);
        if (unlikely(ret))
                GOTO(err_free, ret);

        _gettimeofday(&t2, NULL);

        ANALYSIS_BEGIN(0);

        // 检查完成情况，清除目标快照后的auto snap(if exists)等
        ret = __volume_ctl_rollback_bh2(cent, &snapid);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ANALYSIS_END(0, IO_WARN, NULL);

        _gettimeofday(&t3, NULL);

        // 还原xattr，更新snap_version等
        ret = __volume_ctl_rollback_bh(cent, &snapid);
        if (unlikely(ret))
                GOTO(err_free, ret);

        _gettimeofday(&t4, NULL);

        bmap_destroy(&bmap);
        /**
         * set break-point to original value(0) or remove break-point.
         *
         * rollback break-point only create by this function.
         * and xattr already be restored by __volume_ctl_rollback_bh.
         * so, we don't need this step.
         */
        /*
        ret = volume_ctl_rmbp(cent, LICH_SYSTEM_ATTR_ROLLBACK_BP);
        if (unlikely(ret))
                GOTO(err_release, ret);
        */

        // --- 删除任务
        ret = bh_task_remove(pool, ROLLBACK_ROOT, volid);
        if (unlikely(ret)) {
                if (ret != ENOENT)
                        GOTO(err_release, ret);
        }

        {
                // 触发reload过程
                ret = __volume_ctl_wrlock(cent);
                if (unlikely(ret))
                        GOTO(err_release, ret);

                volume_proto = cent->value;
                volume_proto->ltime = 0;
                DINFO(""CHKID_FORMAT" reset\n", CHKID_ARG(volid));

                rollback_context_dump(volume_proto);

                __volume_ctl_unlock(cent);
        }

        _gettimeofday(&t5, NULL);

        int64_t used1 = _time_used(&t1, &t2);
        int64_t used2 = _time_used(&t2, &t3);
        int64_t used3 = _time_used(&t3, &t4);
        int64_t used4 = _time_used(&t4, &t5);
        DBUG("used %jd %jd %jd %jd\n", used1, used2, used3, used4);

out:
        __volume_ctl_release(cent);
        return 0;
err_free:
        bmap_destroy(&bmap);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}
